1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent.testing;
18
19 import com.google.common.annotations.Beta;
20 import com.google.common.util.concurrent.ListenableFuture;
21
22 import junit.framework.TestCase;
23
24 import java.util.concurrent.CancellationException;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32
33
34
35
36
37
38
39
40 @Beta
41 public abstract class AbstractListenableFutureTest extends TestCase {
42
43 protected CountDownLatch latch;
44 protected ListenableFuture<Boolean> future;
45
46 @Override
47 protected void setUp() throws Exception {
48
49
50 latch = new CountDownLatch(1);
51 future = createListenableFuture(Boolean.TRUE, null, latch);
52 }
53
54 @Override
55 protected void tearDown() throws Exception {
56
57
58 latch.countDown();
59 }
60
61
62
63
64
65 protected abstract <V> ListenableFuture<V> createListenableFuture(
66 V value, Exception except, CountDownLatch waitOn);
67
68
69
70
71
72 public void testGetBlocksUntilValueAvailable() throws Throwable {
73
74 assertFalse(future.isDone());
75 assertFalse(future.isCancelled());
76
77 final CountDownLatch successLatch = new CountDownLatch(1);
78 final Throwable[] badness = new Throwable[1];
79
80
81 new Thread(new Runnable() {
82 @Override
83 public void run() {
84 try {
85 assertSame(Boolean.TRUE, future.get());
86 successLatch.countDown();
87 } catch (Throwable t) {
88 t.printStackTrace();
89 badness[0] = t;
90 }
91 }}).start();
92
93
94 latch.countDown();
95
96 assertTrue(successLatch.await(10, TimeUnit.SECONDS));
97
98 if (badness[0] != null) {
99 throw badness[0];
100 }
101
102 assertTrue(future.isDone());
103 assertFalse(future.isCancelled());
104 }
105
106
107
108
109
110 public void testTimeoutOnGetWorksCorrectly() throws InterruptedException,
111 ExecutionException {
112
113
114 try {
115 future.get(20, TimeUnit.MILLISECONDS);
116 fail("Should have timed out trying to get the value.");
117 } catch (TimeoutException expected) {
118
119 } finally {
120 latch.countDown();
121 }
122 }
123
124
125
126
127
128
129 public void testCanceledFutureThrowsCancellation() throws Exception {
130
131 assertFalse(future.isDone());
132 assertFalse(future.isCancelled());
133
134 final CountDownLatch successLatch = new CountDownLatch(1);
135
136
137 new Thread(new Runnable() {
138 @Override
139 public void run() {
140 try {
141 future.get();
142 } catch (CancellationException expected) {
143 successLatch.countDown();
144 } catch (Exception ignored) {
145
146 }
147 }
148 }).start();
149
150 assertFalse(future.isDone());
151 assertFalse(future.isCancelled());
152
153 future.cancel(true);
154
155 assertTrue(future.isDone());
156 assertTrue(future.isCancelled());
157
158 assertTrue(successLatch.await(200, TimeUnit.MILLISECONDS));
159
160 latch.countDown();
161 }
162
163 public void testListenersNotifiedOnError() throws Exception {
164 final CountDownLatch successLatch = new CountDownLatch(1);
165 final CountDownLatch listenerLatch = new CountDownLatch(1);
166
167 ExecutorService exec = Executors.newCachedThreadPool();
168
169 future.addListener(new Runnable() {
170 @Override
171 public void run() {
172 listenerLatch.countDown();
173 }
174 }, exec);
175
176 new Thread(new Runnable() {
177 @Override
178 public void run() {
179 try {
180 future.get();
181 } catch (CancellationException expected) {
182 successLatch.countDown();
183 } catch (Exception ignored) {
184
185 }
186 }
187 }).start();
188
189 future.cancel(true);
190
191 assertTrue(future.isCancelled());
192 assertTrue(future.isDone());
193
194 assertTrue(successLatch.await(200, TimeUnit.MILLISECONDS));
195 assertTrue(listenerLatch.await(200, TimeUnit.MILLISECONDS));
196
197 latch.countDown();
198
199 exec.shutdown();
200 exec.awaitTermination(100, TimeUnit.MILLISECONDS);
201 }
202
203
204
205
206
207
208
209 public void testAllListenersCompleteSuccessfully()
210 throws InterruptedException, ExecutionException {
211
212 ExecutorService exec = Executors.newCachedThreadPool();
213
214 int listenerCount = 20;
215 final CountDownLatch listenerLatch = new CountDownLatch(listenerCount);
216
217
218
219 for (int i = 0; i < 20; i++) {
220
221
222 if (i == 10) {
223 new Thread(new Runnable() {
224 @Override
225 public void run() {
226 latch.countDown();
227 }
228 }).start();
229 }
230
231 future.addListener(new Runnable() {
232 @Override
233 public void run() {
234 listenerLatch.countDown();
235 }
236 }, exec);
237 }
238
239 assertSame(Boolean.TRUE, future.get());
240
241 listenerLatch.await(500, TimeUnit.MILLISECONDS);
242
243 exec.shutdown();
244 exec.awaitTermination(500, TimeUnit.MILLISECONDS);
245 }
246 }